package com.atguigu.flink.wordcount;

import com.atguigu.flink.pojo.WordCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/7
 *


    如果函数只使用一次，可以使用
        匿名内部类： 面向对象编程。万物皆对象。函数(方法)必须写在一个类中。
        lamda表达式: 函数式编程。万物皆函数。 函数(方法)是一等公民。函数不用寄人(类)篱下

    ---------------------
 InvalidTypesException:  非法类型异常
        flatmap(function x): x的返回值类型由于类型擦除无法被编译器自动去认定。
        The return type of function 'main(Demo6_LamdaPojo.java:31)' could not be determined automatically,
        due to type erasure.

        函数x中，编写的Collector<WordCount> out
                    out是Collector类型，编译器是清楚的。
                    Collector<T>中我们传入的T也就是WordCount称为泛型，在编译时，被擦除了，于是下游不知道发送的T是什么类型。

         You can give type information hints by using the returns(...) method on the result of the transformation call,
        or by letting your function implement the 'ResultTypeQueryable' interface.
            解决方案：
                1.坚持玩lamda，必须在flatmap返回结果的后面使用returns()告知T的类型信息
                2.劝退，别用lamda，可以使用 外部类，内部类，匿名内部类

 */
public class Demo6_LamdaPojo
{
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",3333);

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        environment.setParallelism(3);
        //无界流
        DataStreamSource<String> dataStreamSource = environment.socketTextStream("hadoop102", 8888)
                                               //socketTextStream 属于非并行运行的算子 The parallelism of non parallel operator must be 1.
            .setParallelism(1);

        dataStreamSource.
            flatMap(
                //(FlatMapFunction<String, WordCount>)  //强转运算符。如果省略了强转运算符，就需要在参数列表补齐每一个参数的类型
                (String value,Collector<WordCount> out) -> {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new WordCount(word,1));
                }
            }).setParallelism(5)
            // .returns(WordCount.class)
            .returns(TypeInformation.of(WordCount.class))
            .keyBy(WordCount::getWord)
            // sum()运算上游发送的数据类型是pojo，此时sum(POJO类的字段名)
            .sum("count").setParallelism(6)
            .print().setParallelism(2);

        environment.execute();



    }
}
